{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Extensionmethods\n",
    "Not all operators are loaded at import of rx."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "error: type object 'Observable' has no attribute 'from_marbles'\n"
     ]
    }
   ],
   "source": [
    "# Example: from_marbles\n",
    "import rx\n",
    "try:\n",
    "    rx.Observable.from_marbles('a-b|')\n",
    "except Exception as ex:\n",
    "    print 'error:', ex  # shown only after ipython notebook kernel restart"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "a\n",
      "b\n",
      "c\n"
     ]
    }
   ],
   "source": [
    "# -> to see whats there don't use e.g. `dir(Observable)` but find\n",
    "# 'def from_marbles' in the rx directory, to see the module,\n",
    "# then import it:\n",
    "'''\n",
    "~/GitHub/RxPY/rx $ ack 'def from_marbl'\n",
    "testing/marbles.py\n",
    "90:def from_marbles(self, scheduler=None):\n",
    "'''\n",
    "import rx.testing.marbles\n",
    "def show(x): print (x)\n",
    "stream = rx.Observable.from_marbles('a-b--c|').to_blocking().subscribe(show)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# Async Operations\n",
    "It is useful to understand on a high level how RxPY handles asyncronity and when.\n",
    "E.g. naively you might want to know, when notifying a value to a subscriber, what other subscribers are present.  \n",
    "This makes no sense to ask (I think in general in reactive programming) and it will be clear looking at an example.    \n",
    "\n",
    "Consider timing and thread outputs in the following:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1482356523.29513192 -   AutoDetachObserver 275507917 - MainThread\n",
      "\n",
      "1482356523.29556108 -                   Observer 1 1 - MainThread\n",
      "\n",
      "1482356523.29563403 -                   Observer 1 1 - MainThread\n",
      "\n",
      "1482356523.29568505 -                   Observer 1 1 - MainThread\n",
      "\n",
      "1482356523.29573011 -                   Observer 1 1 - MainThread\n",
      "\n",
      "1482356523.29649496 -   AutoDetachObserver 275507817 - MainThread\n",
      "\n",
      "1482356523.29657793 -                   Observer 2 1 - MainThread\n",
      "\n",
      "1482356523.29663706 -                   Observer 2 1 - MainThread\n",
      "\n",
      "1482356523.29668999 -                   Observer 2 1 - MainThread\n",
      "\n",
      "1482356523.29674101 -                   Observer 2 1 - MainThread\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# =============================\n",
    "# change these (both in millis)\n",
    "delay_stream, slow_emit = 0, 0\n",
    "# =============================\n",
    "\n",
    "import rx, threading, random, time\n",
    "thread = threading.currentThread\n",
    "\n",
    "\n",
    "def call_observer(obs):\n",
    "    '''observer functions are invoked, blocking'''\n",
    "    print_out(obs.__class__.__name__, hash(obs))\n",
    "    for i in range(2):\n",
    "        obs.on_next(1)\n",
    "        if slow_emit:\n",
    "            time.sleep(slow_emit/1000)\n",
    "        obs.on_next(1)\n",
    "        \n",
    "stream = rx.Observable.create(call_observer).take(10)\n",
    "if delay_stream:\n",
    "    stream = stream.delay(delay_stream)\n",
    "\n",
    "def print_out(*v):\n",
    "    '''printout of current time, v, and current thread'''\n",
    "    v_pretty = ' '.join([str(s) for s in v])\n",
    "    print ('%.8f - %30s - %s\\n' % (time.time(), v_pretty, thread().getName()))\n",
    "    \n",
    "    \n",
    "d = stream.subscribe(lambda x: print_out('Observer 1', x))\n",
    "d = stream.subscribe(lambda x: print_out('Observer 2', x))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "- As long as there is no time related stream operator involved, then RXPy does everything *syncrononous*. \n",
    "- RXPy goes async only when it has to, according to the nature of the async operation declared by the user.\n",
    "- It defaults to reasonable mechanics, e.g. using threading.\n",
    "- You can overwrite these defaults, by picking a \"scheduler\" (e.g. gevent, e.g. twisted, e.g. futures)\n",
    "\n",
    "> => In the `call_observer` function you can't know about the concurrency situation\n",
    ">    It soleley depends on the design of the stream operations applied.\n",
    ">    See `.ref_count()` though, for published streams\n",
    "\n",
    "Check the [`.observe_on`](./Part%20VII%20-%20Meta%20Operations.ipynb#...when-it-notifies-observers-observe_on) example to get a deeper understanding how scheduling works.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
